Jar 作业开发
1 Maven 添加pom依赖:
<dependency>
<groupId>cn.tongdun.spark</groupId>
<artifactId>jobapi</artifactId>
<version>1.0.6-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
2 实现接口 cn.tongdun.spark.jobapi.SparkJob:
class ScalaJobDemo extends SparkJob {
/**
* 该方法为作业的执行主入口
*/
override def runJob(sparkSession: SparkSession, logger: SparkJobLogger, args: Array[String]): Unit = {
logger.info("execute SparkJobDemo")
val schemaString = "Id Name Age"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))
val schema = StructType(fields)
logger.info("add hdfs://tdhdfs/user/tao.fu/testdata")
val row = sparkSession.sparkContext.textFile("hdfs://tdhdfs/user/tao.fu/testdata")
val rowRdd = row.map(_.split(",")).map(attr => Row(attr(0).trim, attr(1).trim, attr(2).trim))
logger.info("create table tdl_spark_people")
val peopleDf = sparkSession.createDataFrame(rowRdd, schema).createOrReplaceTempView("tdl_spark_people")
logger.info("execute select * from tdl_spark_people")
logger.info("Result: "+sparkSession.sql("select * from tdl_spark_people").collectAsList())
}
}
3 继承 cn.tongdun.spark.jobapi.
AbstractSparkJob 实现 SparkJob接口,提供数据管道功能, 数据管道: 上游作业实例可以传递数据给下游作业实例,仅支持字符串数据,管道大小限制512k。
AbstractSparkJob 操作管道提供如下两个方法: setPipelineData 给管道设置数据, 提供给下游作业实例使用 getPipelineData 从管道获取上游作业实例传输的数据
4 部署应用
将自定义实现的类打成jar包,上传至dataocean平台,新建job作业类型为Spark jar,输入以下指令
[set key=value;]
jar包名 入口类名 自定义参数 [--jars jar1,jar2]
可以输入多行命令,各行之间分号分割
例如:jobdemo-0.1.0.jar cn.tongdun.spark.demo.SparkJobDemo param1 param2;
jobdemo-0.1.0.jar cn.tongdun.spark.demo.SparkJobDemo param3 param4
任务的提交支持添加相关依赖,当目标jar包需要依赖相关包时,可以事先将依赖包上传至平台,在--jars中指定依赖包名称,以逗号分割 例如:
set spark.sql.shuffle.partitions = 5;
jobserverdemo-1.0-SNAPSHOT.jar cn.tongdun.spark.SparkJobDemo --jars flume-ng-sdk-1.8.0.jar(建议是绝对路径)
flume-ng-sdk-1.8.0.jar需首先上传至dataocean平台
用户可以自定义spark的执行参数,在命令行之前加上 set key=value; 例如 set spark.sql.shuffle.partitions = 5;(可以设置多个参数,以分号间隔) unset 命令可以重置设定的参数 例如 unset spark.sql.shuffle.partitions (此时spark.sql.shuffle.partitions将被设定为系统默认值)
5 注意事项
- 在runJob方法中不要捕获异常,直接将异常抛出,另外不要调用System.exit()等方法
- 查看日志采用SparkJobLogger自带的方法,包括info、warn、error 3种级别
- 创建临时表推荐使用createOrReplaceTempView 表名需包含至少3个字段,以_作分割,并且以tdl开头,例如tdl_tmp_activity